19 实践课-传感器仿真配置与 ROS2 数据交互实现

- 传感器仿真配置与 ROS2 数据交互实现

关联:索引

术语约定(避免混淆):

配套工程(与本代码一一对应):

在 Ubuntu/ROS2 Humble 下的最小运行命令(给证据):

cd ./05_sensor_sim_bridge_control/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 launch sensor_sim_bridge_control bringup.launch.py

强制软件渲染(用于无 GPU/驱动不兼容/远程桌面场景):

ros2 launch sensor_sim_bridge_control bringup.launch.py libgl_always_software:=1

取消强制软件渲染(优先硬件渲染,性能更好):

ros2 launch sensor_sim_bridge_control bringup.launch.py libgl_always_software:=0

逐行解释:


  1. 相机在仿真中加载成功(GUI 画面/日志中无传感器加载失败)。
  2. ROS2 中能看到相机相关话题(ros2 topic list 证据)。
  3. 能证明“真的在传数据”(优先用 ros2 topic hz --qos-profile sensor_dataCameraInfo 可用 ros2 topic echo --once 取一帧证据)。

1)相机 SDF 最小模板(可直接粘贴对照)

<model name="industrial_camera_rig">
  <static>true</static>
  <pose>0 0 1.2 0 0 0</pose>
  <link name="camera_link">
    <sensor name="industrial_rgb" type="camera">
      <pose>0 0 0 0 0 0</pose>
      <always_on>true</always_on>
      <update_rate>30</update_rate>
      <visualize>true</visualize>
      <topic>/sim/camera/rgb</topic>
      <camera>
        <horizontal_fov>1.047</horizontal_fov>
        <image>
          <width>1280</width>
          <height>720</height>
          <format>R8G8B8</format>
        </image>
        <clip>
          <near>0.1</near>
          <far>8.0</far>
        </clip>
        <noise>
          <type>gaussian</type>
          <mean>0.0</mean>
          <stddev>0.001</stddev>
        </noise>
      </camera>
    </sensor>
  </link>
</model>

逐段解释(你要能口头讲清):

2)内参自检:用量级检查避免“参数离谱但能跑”

用“等效焦距量级”做快速自检(只用于检查量级,不要求推导):

fx ≈ (width / 2) / tan(horizontal_fov / 2)

怎么用:

目标:把相机光轴对准传送带与分拣区,避免“相机加载了但画面全是地面/天空”。

  1. 先只改 world pose(粗对齐),让视野里出现传送带与目标物。
  2. 再改 sensor pose(微对齐),让目标落在画面中心附近。
  3. 每次只改一个轴或一个角度,并截图记录“参数→现象”。

2)最常见外参错误与快速定位

定位证据:

1)launch 组织原则(必须遵守)

2)Gazebo Sim(Fortress)常见写法:ros_gz_bridge 做数据桥接(模板)

先确认 Gazebo 内部 Topic 名称(避免“桥接目标写错但不报错”):

# 二选一:用你机器上可用的命令(Fortress 常见是 ign)
ign topic -l
gz topic -l

逐行解释:

进一步验证“Gazebo 端真的在发布”(每个话题取 1 条回显作为证据):

ign topic -e -t /sim/camera/rgb -n 1
ign topic -e -t /sim/camera/camera_info -n 1

逐行解释:

再确认桥接支持哪些“Gazebo 消息类型 ↔ ROS2 消息类型”映射(避免写了类型但桥接不支持):

ros2 run ros_gz_bridge parameter_bridge --print-pairs

逐行解释:

"""
一键启动:Gazebo 相机仿真 + Gazebo→ROS2 桥接 + 图像触发控制节点.

启动内容:
- ign gazebo:运行 worlds/sorting_demo.world(内含 camera sensor)
- ros_gz_bridge/parameter_bridge:
  - /sim/camera/rgb -> /camera/image_raw
  - /sim/camera/camera_info -> /camera/camera_info
- sensor_sim_bridge_control/image_trigger_arm:
  - 订阅 /camera/image_raw
  - 收到图像后向 /arm_forward_controller/commands 发布一次关节目标(默认 one_shot)

常见问题与对应参数:
- Gazebo 消息类型不同(ignition.msgs vs gz.msgs):
  - gz_image_msg_type / gz_camera_info_msg_type 可覆盖
- 想改 ROS2 侧输出话题名:
  - image_topic / camera_info_topic 可覆盖
"""

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, ExecuteProcess, SetEnvironmentVariable
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution, PythonExpression
from launch_ros.actions import Node
from launch_ros.parameter_descriptions import ParameterValue
from launch_ros.substitutions import FindPackageShare

def generate_launch_description():
    # 全局开关:是否使用 /clock(仿真时间)。
    use_sim_time = LaunchConfiguration("use_sim_time")
    # world 文件路径(默认指向包内 worlds/sorting_demo.world)。
    world = LaunchConfiguration("world")
    # Gazebo 启动命令与子命令(默认:ign gazebo)。
    gazebo_cmd = LaunchConfiguration("gazebo_cmd")
    gazebo_subcmd = LaunchConfiguration("gazebo_subcmd")
    # 强制软件渲染开关(无 GPU 或驱动问题时常用)。
    libgl_always_software = LaunchConfiguration("libgl_always_software")

    # ROS 侧话题名:桥接后的图像与相机内参输出位置。
    image_topic = LaunchConfiguration("image_topic")
    camera_info_topic = LaunchConfiguration("camera_info_topic")
    # ROS 侧控制话题名:image_trigger_arm 节点发布关节指令的位置。
    command_topic = LaunchConfiguration("command_topic")
    # Gazebo Transport 消息类型(不同版本/发行版可能是 ignition.msgs.* 或 gz.msgs.*)。
    gz_image_msg_type = LaunchConfiguration("gz_image_msg_type")
    gz_camera_info_msg_type = LaunchConfiguration("gz_camera_info_msg_type")

    return LaunchDescription(
        [
            DeclareLaunchArgument("use_sim_time", default_value="true"),
            DeclareLaunchArgument("libgl_always_software", default_value="1"),
            DeclareLaunchArgument(
                "world",
                default_value=PathJoinSubstitution(
                    [
                        FindPackageShare("sensor_sim_bridge_control"),
                        "worlds",
                        "sorting_demo.world",
                    ]
                ),
            ),
            DeclareLaunchArgument("gazebo_cmd", default_value="ign"),
            DeclareLaunchArgument("gazebo_subcmd", default_value="gazebo"),
            DeclareLaunchArgument("image_topic", default_value="/camera/image_raw"),
            DeclareLaunchArgument(
                "camera_info_topic", default_value="/camera/camera_info"
            ),
            DeclareLaunchArgument(
                "command_topic", default_value="/arm_forward_controller/commands"
            ),
            DeclareLaunchArgument(
                "gz_image_msg_type",
                default_value="ignition.msgs.Image",
            ),
            DeclareLaunchArgument(
                "gz_camera_info_msg_type",
                default_value="ignition.msgs.CameraInfo",
            ),
            SetEnvironmentVariable(
                name="LIBGL_ALWAYS_SOFTWARE",
                value=libgl_always_software,
            ),
            ExecuteProcess(
                # -r:启动即运行(无需在 GUI 点 Play)
                # -v 4:输出更详细日志,便于排查资源加载与传感器发布问题
                cmd=[gazebo_cmd, gazebo_subcmd, "-r", "-v", "4", world],
                output="screen",
            ),
            Node(
                package="ros_gz_bridge",
                executable="parameter_bridge",
                name="camera_bridge",
                output="screen",
                arguments=[
                    # parameter_bridge 的参数是“单个字符串规则”:
                    # <gz_topic>@<ros_type>[<gz_type>
                    # 这里用 PythonExpression 在运行时拼接出完整字符串,避免把参数拆成多个片段。
                    PythonExpression(
                        [
                            "'/sim/camera/rgb@sensor_msgs/msg/Image[' + '",
                            gz_image_msg_type,
                            "'",
                        ]
                    ),
                    PythonExpression(
                        [
                            "'/sim/camera/camera_info@sensor_msgs/msg/CameraInfo[' + '",
                            gz_camera_info_msg_type,
                            "'",
                        ]
                    ),
                ],
                parameters=[{"use_sim_time": ParameterValue(use_sim_time, value_type=bool)}],
                remappings=[
                    # 将 bridge 输出的 ROS 话题重映射到用户自定义名称(默认 /camera/*)。
                    ("/sim/camera/rgb", image_topic),
                    ("/sim/camera/camera_info", camera_info_topic),
                ],
            ),
            Node(
                package="sensor_sim_bridge_control",
                executable="image_trigger_arm",
                name="image_trigger_arm",
                output="screen",
                parameters=[
                    {"use_sim_time": ParameterValue(use_sim_time, value_type=bool)},
                    {"image_topic": image_topic},
                    {"command_topic": command_topic},
                ],
            ),
        ]
    )

逐段解释(你需要能定位哪一行导致“桥接失败”):

重要提醒(避免“盲抄网络片段”):

先确认对象存在:

ros2 topic list
ros2 topic type /camera/image_raw
ros2 topic type /camera/camera_info

逐行解释:

再确认真的在传数据(给证据):

ros2 topic hz --qos-profile sensor_data /camera/image_raw
ros2 topic echo --once /camera/camera_info

逐行解释:

推荐的“图像可视化证据”(加分项,但更符合工程实际):

ros2 run image_tools showimage --ros-args -r image:=/camera/image_raw

逐行解释:

  1. 选择一个目标物(苹果/箱体/标签任意),确保画面能稳定看到它。
  2. 至少做 3 次外参微调(例如 yaw 微调、Z 高度微调),每次只改一个量。
  3. 输出证据:

快问快答(用证据说话):

  1. 你们组相机的 ROS2 话题名是什么?用哪条命令证明它有数据?
  2. 如果“话题存在但没数据”,你第一优先查哪三件事?

配套工程对应关系(重要):

ROS2 控制器(controller_manager) → 指令话题/服务 → 仿真控制插件(gazebo_ros2_control 或 gz_ros2_control) → 仿真关节/执行器

你需要能区分两类证据:

1)控制器 YAML(通用模板,先跑通 joint_state_broadcaster)

controller_manager:
  ros__parameters:
    update_rate: 100

    joint_state_broadcaster:
      type: joint_state_broadcaster/JointStateBroadcaster

    arm_forward_controller:
      type: forward_command_controller/ForwardCommandController

arm_forward_controller:
  ros__parameters:
    joints:
      - shoulder_pan_joint
      - shoulder_lift_joint
      - elbow_joint
      - wrist_1_joint
      - wrist_2_joint
      - wrist_3_joint
    interface_name: position

逐段解释:

2)Gazebo Classic 写法提示:gazebo_ros2_control(模板片段)

<gazebo>
  <plugin name="gazebo_ros2_control" filename="libgazebo_ros2_control.so">
    <parameters>$(find-pkg-share your_pkg)/config/controllers.yaml</parameters>
  </plugin>
</gazebo>

逐段解释:

3)Gazebo Sim(Fortress)写法提示:gz_ros2_control(模板片段)

<plugin filename="libgz_ros2_control-system.so" name="gz_ros2_control::GazeboSimROS2ControlPlugin">
  <ros>
    <namespace>/</namespace>
  </ros>
  <parameters>$(find-pkg-share your_pkg)/config/controllers.yaml</parameters>
</plugin>

逐段解释:

ros2 control list_controllers
ros2 topic echo --once /joint_states

逐行解释:

1)最小闭环节点(Python,示例:收到一帧图像就发布一次关节目标)

"""
图像触发机械臂指令发布节点.

功能概览:
- 订阅相机图像话题(sensor_msgs/msg/Image)
- 当满足触发条件时,向控制话题发布一条关节目标(std_msgs/msg/Float64MultiArray)

用途:
- 教学演示“传感器输入 -> 规则触发 -> 控制输出”的最小闭环
- 不做任何图像识别/AI 推理,仅用“收到图像”作为触发事件

参数(可通过 ros2 param set 修改):
- image_topic:订阅的图像话题(默认 /camera/image_raw)
- command_topic:发布控制指令的话题(默认 /arm_forward_controller/commands)
- target_positions:要发布的关节目标数组(通常 6 个关节)
- one_shot:是否只触发一次(默认 True)
- cooldown_sec:两次发布之间的冷却时间(秒)
- enabled:是否启用触发(默认 True)

服务:
- ~/reset(std_srvs/srv/Trigger):清空 one_shot 触发状态,允许再次发布
- ~/enable(std_srvs/srv/SetBool):启用/禁用触发
"""

import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import (
    QoSHistoryPolicy,
    QoSProfile,
    QoSReliabilityPolicy,
    qos_profile_sensor_data,
)
from sensor_msgs.msg import Image
from std_msgs.msg import Float64MultiArray
from std_srvs.srv import SetBool, Trigger

class ImageTriggerArm(Node):
    def __init__(self):
        super().__init__("image_trigger_arm")

        # 订阅图像输入与发布控制输出的 ROS 话题名。
        self.declare_parameter("image_topic", "/camera/image_raw")
        self.declare_parameter("command_topic", "/arm_forward_controller/commands")
        # 目标关节位置(示例:6 轴机械臂)。
        self.declare_parameter(
            "target_positions", [0.0, -1.0, 1.2, -1.2, 0.0, 0.0]
        )
        # one_shot=true:收到第一帧图像后只发布一次(常用于演示)。
        self.declare_parameter("one_shot", True)
        # 冷却时间:用于限制发布频率(秒)。
        self.declare_parameter("cooldown_sec", 0.0)
        # enabled=false:禁用触发逻辑(仍会保持订阅/发布器存在)。
        self.declare_parameter("enabled", True)

        self._sent_once = False
        self._last_sent_ns: int | None = None

        # 控制指令 QoS:使用 RELIABLE,确保下游控制器更不容易丢指令。
        command_qos = QoSProfile(
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=1,
            reliability=QoSReliabilityPolicy.RELIABLE,
        )

        # 发布器:向 command_topic 发布 Float64MultiArray。
        self._publisher = self.create_publisher(
            Float64MultiArray,
            self.get_parameter("command_topic").value,
            command_qos,
        )

        # 订阅器:相机数据属于典型传感器流,采用 qos_profile_sensor_data(Best Effort)。
        self._subscription = self.create_subscription(
            Image,
            self.get_parameter("image_topic").value,
            self._on_image,
            qos_profile_sensor_data,
        )

        # 服务:用于重置触发状态 / 启用禁用触发。
        self._reset_srv = self.create_service(Trigger, "~/reset", self._on_reset)
        self._enable_srv = self.create_service(SetBool, "~/enable", self._on_enable)

    def _on_enable(self, request: SetBool.Request, response: SetBool.Response):
        # 将 enabled 参数更新为请求值(True/False)。
        self.set_parameters(
            [Parameter("enabled", Parameter.Type.BOOL, bool(request.data))]
        )
        response.success = True
        response.message = "enabled=true" if request.data else "enabled=false"
        return response

    def _on_reset(self, request: Trigger.Request, response: Trigger.Response):
        # 清除 one_shot 状态与上一次发送时间,使下一帧图像可以再次触发发布。
        self._sent_once = False
        self._last_sent_ns = None
        response.success = True
        response.message = "reset ok"
        return response

    def _on_image(self, msg: Image):
        # 说明:msg 本身未被解析,这里只把“收到图像”当作触发事件。
        if not self.get_parameter("enabled").value:
            return

        if self.get_parameter("one_shot").value and self._sent_once:
            return

        cooldown_ns = int(float(self.get_parameter("cooldown_sec").value) * 1e9)
        now_ns = self.get_clock().now().nanoseconds

        if self._last_sent_ns is not None and cooldown_ns > 0:
            if now_ns - self._last_sent_ns < cooldown_ns:
                return

        target_positions = list(self.get_parameter("target_positions").value)
        if len(target_positions) == 0:
            self.get_logger().error("target_positions is empty, skip publish.")
            return

        # 发布控制指令:将 target_positions 填入 Float64MultiArray。
        cmd = Float64MultiArray()
        cmd.data = target_positions
        self._publisher.publish(cmd)
        self._last_sent_ns = now_ns
        self._sent_once = True
        self.get_logger().info(f"Published {len(cmd.data)} joint positions.")

def main(args=None):
    rclpy.init(args=args)
    node = ImageTriggerArm()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()

逐段解释(确保你能改成你们组的关节数/话题名):

ros2 topic hz --qos-profile sensor_data /camera/image_raw
ros2 topic echo /arm_forward_controller/commands
ros2 service call /image_trigger_arm/reset std_srvs/srv/Trigger "{}"

逐行解释:

如果想持续触发便于调试:

ros2 param set /image_trigger_arm one_shot false
ros2 control list_controllers
ros2 topic echo --once /joint_states
  1. 图像频率:ros2 topic hz --qos-profile sensor_data /camera/image_raw(是否稳定接近 update_rate)。
  2. 指令频率/触发:ros2 topic echo --once /arm_forward_controller/commands(事件是否按预期发生)。
  3. 系统负载迹象:观察终端卡顿、桥接日志延迟、GUI 帧率下降(记录现象与当时参数)。

常见优化方向(先改最便宜的):

1)AI 生成任务提示词模板(可直接复制)

你是 ROS2 Humble + Gazebo Fortress 工程助手。请生成:
1) 一个 ROS2 launch.py:启动仿真、设置 use_sim_time、桥接相机图像与 CameraInfo 到 /camera/image_raw 与 /camera/camera_info;
2) 一段 SDF 相机配置:包含分辨率、horizontal_fov、clip、update_rate、topic;
约束:所有话题名可通过 launch 参数改名;代码必须可运行且不依赖未说明的第三方库;给出每一步验收命令(ros2 topic list/type/hz)。

校验点(你必须人工检查):

2)AI 排错提示词模板(带证据输入)

我在 ROS2 Humble + Gazebo 上桥接相机数据失败。
现象:ros2 topic list 能看到 /camera/image_raw,但 ros2 topic hz 为 0。
我贴出三段证据:
1) SDF 相机片段(含 <topic> 与 update_rate)
2) 桥接节点启动命令/launch 片段(含映射类型)
3) 终端报错日志(完整粘贴)
请按“现象→可能原因→最小修改→复验命令”给出排查方案,并给出你最推荐的 3 个最小修改顺序。
  1. 完成工业相机仿真配置,实现图像数据 ROS2 话题发布。
  2. 配置(gazebo_ros2_control 或 gz_ros2_control)插件,订阅设备状态、下发基础控制指令(传送带启停或机械臂关节运动二选一,鼓励两者都做)。
  3. 完成分拣场景基础闭环测试,验证相机采集数据驱动机械臂运动的效果。
  4. 使用 AI 辅助解决至少 1 个数据交互问题,记录交互过程与解决方案(必须包含证据与复验命令)。

  1. 生成工业相机配置 launch 文件模板、传感器数据发布/桥接代码。
  2. 生成控制插件(gazebo_ros2_control 或 gz_ros2_control)配置示例与控制器 YAML 示例。
  3. 针对数据交互报错(话题无数据、指令失效),定位问题并给出可复现调试方案(含最小修改顺序与复验命令)。
  4. 生成闭环测试流程清单与延迟优化建议(必须可量化、可复验)。

作业(布置)

1)提交工业相机配置文件、图像数据发布测试截图(含 ros2 topic echo --once /camera/camera_inforos2 topic hz /camera/image_raw 输出)。

2)提交设备控制指令下发的测试报告,包含指令内容、设备响应效果、测试中问题及解决方法(300 字左右)。

3)提交闭环测试成功的视频 / 截图,附闭环流程优化说明(重点说明响应延迟、数据精度的优化措施)。


Markdown 与代码自检清单(提交前必须过一遍)